package test.mq;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class testrokcet2_PushConsumer {
	/**
	 * 
	 * 当前例子是PushConsumer用法，使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
	 * 
	 * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息，然后再回调用户Listener方法
	 * 
	 */

	public static void main(String[] args) throws InterruptedException, MQClientException {
		test2();
	}

	public static void test1() throws InterruptedException, MQClientException {

		/**
		 * 
		 * 一个应用创建一个Consumer，由应用来维护此对象，可以设置为全局对象或者单例
		 * 
		 * 注意：ConsumerGroupName需要由应用来保证唯一
		 * 
		 */

		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

		consumer.setNamesrvAddr("192.168.0.147:9876");

		consumer.setInstanceName("Consumber");

		/**
		 * 
		 * 订阅指定topic下tags分别等于TagA或TagC或TagD
		 * 
		 */

		consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

		/**
		 * 
		 * 订阅指定topic下所有消息
		 * 
		 * 注意：一个consumer对象可以订阅多个topic
		 * 
		 */

		consumer.subscribe("TopicTest2", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

					ConsumeConcurrentlyContext context) {
				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());

				MessageExt msg = msgs.get(0);

				if (msg.getTopic().equals("test")) {
					// 执行TopicTest1的消费逻辑

					if (msg.getTags() != null && msg.getTags().equals("TagA")) {
						// 执行TagA的消费

						System.out.println(new String(msg.getBody()));

					}

					else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
						// 执行TagC的消费

						System.out.println(new String(msg.getBody()));

					}

					else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
						// 执行TagD的消费

						System.out.println(new String(msg.getBody()));

					}

				}

				else if (msg.getTopic().equals("test2")) {
					System.out.println(new String(msg.getBody()));

				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

			}

		});

		/**
		 * 
		 * Consumer对象在使用之前必须要调用start初始化，初始化一次即可
		 * 
		 */

		consumer.start();

		System.out.println("Consumer Started.");

	}

	public static void test2() throws InterruptedException, MQClientException {

		// Instantiate with specified consumer group name.
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

		// Specify name server addresses.
		consumer.setNamesrvAddr("192.168.0.147:9876");
		consumer.setVipChannelEnabled(false);
		// Subscribe one more more topics to consume.
		consumer.subscribe("test", "*");
		// Register callback to execute on arrival of messages fetched from brokers.
		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		// Launch the consumer instance.
		consumer.start();

		System.out.printf("Consumer Started.%n");
	}
}
